import os
import re

from torch_npu.utils._error_code import ErrCode, prof_error
from ._constant import Constant

__all__ = []


class ProfilerPathManager:
    @classmethod
    def get_fwk_path(cls, profiler_path: str) -> str:
        fwk_path = os.path.join(profiler_path, Constant.FRAMEWORK_DIR)
        if os.path.isdir(fwk_path):
            return fwk_path
        return ""

    @classmethod
    def get_cann_path(cls, profiler_path: str) -> str:
        sub_dirs = os.listdir(os.path.realpath(profiler_path))
        for sub_dir in sub_dirs:
            sub_path = os.path.join(profiler_path, sub_dir)
            if os.path.isdir(sub_path) and re.match(r"^PROF_\d+_\d+_[0-9a-zA-Z]+", sub_dir):
                return sub_path
        return ""

    @classmethod
    def get_info_file_path(cls, profiler_path: str) -> str:
        sub_files = os.listdir(profiler_path)
        for sub_file in sub_files:
            sub_path = os.path.join(profiler_path, sub_file)
            if sub_file == "profiler_info.json":
                return sub_path
            if re.match(r"^profiler_info_\d+\.json", sub_file):
                return sub_path
        return ""

    @classmethod
    def get_info_path(cls, profiler_path: str) -> str:
        info_path = os.path.join(cls.get_cann_path(profiler_path), 'host/info.json')
        if os.path.exists(info_path):
            return info_path
        else:
            return ""

    @classmethod
    def get_host_start_log_path(cls, profiler_path: str) -> str:
        info_path = os.path.join(cls.get_cann_path(profiler_path), 'host/host_start.log')
        if os.path.exists(info_path):
            return info_path
        else:
            return ""

    @classmethod
    def get_host_path(cls, cann_path: str) -> str:
        host_path = os.path.join(cann_path, 'host')
        if os.path.exists(host_path):
            return host_path
        else:
            return ""

    @classmethod
    def get_device_path(cls, cann_path: str) -> list:
        device_paths = []
        sub_dirs = os.listdir(os.path.realpath(cann_path))
        for sub_dir in sub_dirs:
            sub_path = os.path.join(cann_path, sub_dir)
            if os.path.isdir(sub_path) and re.match(r"^device_\d", sub_dir):
                device_paths.append(sub_path)
        return device_paths

    @classmethod
    def get_device_id(cls, cann_path: str) -> list:
        if not cann_path:
            return []
        device_paths = cls.get_device_path(cann_path)
        if not device_paths:
            return []
        device_id_list = []
        for device_path in device_paths:
            device_path_split = os.path.basename(device_path).split("_")
            if len(device_path_split) != 2:
                return []
            if not device_path_split[1].isdigit():
                return []
            device_id_list.append(int(device_path_split[1]))
        return device_id_list

    @classmethod
    def get_start_info_path(cls, cann_path: str) -> str:
        start_info_path = os.path.join(cann_path, "host", "start_info")
        if os.path.exists(start_info_path):
            return start_info_path
        device_paths = cls.get_device_path(cann_path)
        if not device_paths:
            return ""
        device_path_split = os.path.basename(device_paths[0]).split("_")
        if len(device_path_split) != 2:
            return ""
        start_info_file = f"start_info.{device_path_split[1]}"
        start_info_path = os.path.join(device_paths[0], start_info_file)
        if os.path.exists(start_info_path):
            return start_info_path
        return ""


    @classmethod
    def get_profiler_path_list(cls, input_path: str) -> list:
        if not os.path.isdir(input_path):
            return []
        if cls.get_fwk_path(input_path) or cls.get_cann_path(input_path):
            return [input_path]
        sub_dirs = os.listdir(os.path.realpath(input_path))
        profiler_path_list = []
        for sub_dir in sub_dirs:
            sub_path = os.path.join(input_path, sub_dir)
            if not os.path.isdir(sub_path):
                continue
            if cls.get_fwk_path(sub_path) or cls.get_cann_path(sub_path):
                profiler_path_list.append(sub_path)
        return profiler_path_list

    @classmethod
    def get_output_all_file_list_by_type(cls, profiler_path: str, mindstudio_profiler_output: str) -> list:
        file_list = []
        _path = os.path.join(profiler_path, mindstudio_profiler_output)
        if not os.path.isdir(_path):
            return file_list
        sub_files = os.listdir(os.path.realpath(_path))
        if not sub_files:
            return file_list
        for sub_file in sub_files:
            file_list.append(os.path.join(_path, sub_file))
        return file_list

    @classmethod
    def get_analyze_all_file(cls, profiler_path: str, analyze: str) -> list:
        file_list = []
        _path = os.path.join(profiler_path, analyze)
        if not os.path.isdir(_path):
            return file_list
        sub_files = os.listdir(os.path.realpath(_path))
        if not sub_files:
            return file_list
        for sub_file in sub_files:
            file_list.append(os.path.join(_path, sub_file))
        return file_list

    @classmethod
    def get_database_all_file(cls, profiler_path: str) -> list:
        file_list = []
        sub_files = os.listdir(os.path.realpath(profiler_path))
        if not sub_files:
            return file_list
        for sub_file in sub_files:
            file_list.append(os.path.join(profiler_path, sub_file))
        return file_list

    @classmethod
    def get_realpath(cls, path: str) -> str:
        path = os.path.expanduser(path)
        if os.path.islink(path):
            msg = f"Invalid input path is a soft chain: {path}" + prof_error(ErrCode.UNAVAIL)
            raise RuntimeError(msg)
        return os.path.realpath(path)

    @classmethod
    def get_all_subdir(cls, path, max_depth=4, cur_depth=0):
        paths = []
        if cur_depth > max_depth:
            return paths
        with os.scandir(path) as entries:
            for entry in entries:
                if entry.is_dir():
                    full_path = entry.path
                    paths.append(full_path)
                    # Recursively obtain subdirectories and paths
                    paths.extend(cls.get_all_subdir(full_path, max_depth, cur_depth + 1))
        return paths

    @classmethod
    def path_is_other_writable(cls, path):
        stat_info = os.stat(path)
        return bool(stat_info.st_mode & 0o022)

    @classmethod
    def check_path_permission(cls, path):
        file_stat = os.stat(path)
        current_uid = os.getuid()
        file_uid = file_stat.st_uid
        if file_uid not in (0, current_uid):
            return False
        return True

    @classmethod
    def walk_with_depth(cls, path, max_depth=10, *args, **kwargs):
        if not isinstance(path, str):
            return
        base_depth = path.count(os.sep)
        if path.endswith(os.sep):
            base_depth -= 1
        for root, dirs, files in os.walk(path, *args, **kwargs):
            if root.count(os.sep) - base_depth > max_depth:
                dirs.clear()
                continue
            yield root, dirs, files
